home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import apt
- import logging
- import time
- import sys
- from DistUpgradeView import DistUpgradeView, InstallProgress, FetchProgress
- from DistUpgradeConfigParser import DistUpgradeConfig
- import os
- import pty
- import apt_pkg
- import select
- import fcntl
- import string
- import re
- import subprocess
- from subprocess import call, PIPE, Popen
- import copy
- import apt.progress as apt
- from ConfigParser import NoSectionError, NoOptionError
-
- class NonInteractiveFetchProgress(FetchProgress):
-
- def updateStatus(self, uri, descr, shortDescr, status):
- FetchProgress.updateStatus(self, uri, descr, shortDescr, status)
- if status == apt.progress.FetchProgress.dlDone:
- print 'fetched %s (%.2f/100) at %sb/s' % (uri, self.percent, apt_pkg.SizeToStr(int(self.currentCPS)))
- if sys.stdout.isatty():
- sys.stdout.flush()
-
-
-
-
-
- class NonInteractiveInstallProgress(InstallProgress):
-
- def __init__(self):
- InstallProgress.__init__(self)
- logging.debug('setting up environ for non-interactive use')
- os.environ['DEBIAN_FRONTEND'] = 'noninteractive'
- os.environ['APT_LISTCHANGES_FRONTEND'] = 'none'
- os.environ['RELEASE_UPRADER_NO_APPORT'] = '1'
- self.config = DistUpgradeConfig('.')
-
- try:
- if self.config.getboolean('NonInteractive', 'ForceOverwrite'):
- apt_pkg.Config.Set('DPkg::Options::', '--force-overwrite')
- except (NoSectionError, NoOptionError):
- e = None
-
- self.timeout = 2400
-
- try:
- self.timeout = self.config.getint('NonInteractive', 'TerminalTimeout')
- except Exception:
- e = None
-
-
-
- def error(self, pkg, errormsg):
- logging.error("got a error from dpkg for pkg: '%s': '%s'" % (pkg, errormsg))
- environ = copy.copy(os.environ)
- environ['PYCENTRAL'] = 'debug'
- cmd = []
- if 'post-installation' in errormsg:
- prefix = '/var/lib/dpkg/info/'
- name = 'postinst'
- argument = 'configure'
- maintainer_script = '%s/%s.%s' % (prefix, pkg, name)
- elif 'pre-installation' in errormsg:
- prefix = '/var/lib/dpkg/tmp.ci/'
- name = 'preinst'
- argument = 'install'
- maintainer_script = '%s/%s' % (prefix, name)
- elif 'pre-removal' in errormsg:
- prefix = '/var/lib/dpkg/info/'
- name = 'prerm'
- argument = 'remove'
- maintainer_script = '%s/%s.%s' % (prefix, pkg, name)
- elif 'post-removal' in errormsg:
- prefix = '/var/lib/dpkg/info/'
- name = 'postrm'
- argument = 'remove'
- maintainer_script = '%s/%s.%s' % (prefix, pkg, name)
- else:
- print 'UNKNOWN (trigger?) dpkg/script failure for %s (%s) ' % (pkg, errormsg)
- return None
- if not ('post-installation' in errormsg).path.exists(maintainer_script):
- logging.error("can not find failed maintainer script '%s' " % maintainer_script)
- return None
- interp = open(maintainer_script).readline()[2:].strip().split()[0]
- if 'bash' in interp or '/bin/sh' in interp:
- debug_opts = [
- '-ex']
- elif 'perl' in interp:
- debug_opts = [
- '-d']
- environ['PERLDB_OPTS'] = 'AutoTrace NonStop'
- else:
- logging.warning("unknown interpreter: '%s'" % interp)
- if '. /usr/share/debconf/confmodule' in open(maintainer_script).read():
- environ['DEBCONF_DEBUG'] = 'developer'
- environ['DEBIAN_HAS_FRONTEND'] = '1'
- interp = '/usr/share/debconf/frontend'
- debug_opts = [
- 'sh',
- '-ex']
-
- cmd.append(interp)
- cmd.extend(debug_opts)
- cmd.append(maintainer_script)
- cmd.append(argument)
- if name == 'postinst':
- version = Popen('dpkg-query -s %s|grep ^Config-Version' % pkg, shell = True, stdout = PIPE).communicate()[0]
- if version:
- cmd.append(version.split(':', 1)[1].strip())
-
- elif name == 'preinst':
- pkg = os.path.basename(pkg)
- pkg = pkg.split('_')[0]
- version = Popen('dpkg-query -s %s|grep ^Version' % pkg, shell = True, stdout = PIPE).communicate()[0]
- if version:
- cmd.append(version.split(':', 1)[1].strip())
-
-
- print cmd
- logging.debug("re-running '%s' (%s)" % (cmd, environ))
- ret = subprocess.call(cmd, env = environ)
- logging.debug('%s script returned: %s' % (name, ret))
-
-
- def conffile(self, current, new):
- logging.warning("got a conffile-prompt from dpkg for file: '%s'" % current)
- time.sleep(5)
-
- try:
- os.write(self.master_fd, 'n\n')
- except Exception:
- e = None
- logging.error("error '%s' when trying to write to the conffile" % e)
-
-
-
- def startUpdate(self):
- InstallProgress.startUpdate(self)
- self.last_activity = time.time()
-
-
- def updateInterface(self):
- if self.statusfd == None:
- return None
- if self.last_activity + self.timeout < time.time():
- logging.warning('no activity %s seconds (%s) - sending ctrl-c' % (self.timeout, self.status))
- os.write(self.master_fd, chr(3))
-
- res = select.select([
- self.statusfd], [], [], 0.1)
- while len(res[0]) > 0:
- self.last_activity = time.time()
- while not self.read.endswith('\n'):
- self.read += os.read(self.statusfd.fileno(), 1)
- continue
- self
- if self.read.endswith('\n'):
- s = self.read
- (status, pkg, percent, status_str) = string.split(s, ':')
- if status == 'pmerror':
- self.error(pkg, status_str)
- elif status == 'pmconffile':
- match = re.compile("\\s*'(.*)'\\s*'(.*)'.*").match(status_str)
- if match:
- self.conffile(match.group(1), match.group(2))
-
- elif status == 'pmstatus':
- if float(percent) != self.percent or status_str != self.status:
- self.statusChange(pkg, float(percent), status_str.strip())
- self.percent = float(percent)
- self.status = string.strip(status_str)
- sys.stdout.write('[%s] %s: %s\n' % (float(percent), pkg, status_str.strip()))
- sys.stdout.flush()
-
-
-
- self.read = ''
- res = select.select([
- self.statusfd], [], [], 0.1)
- res = select.select([
- self.master_fd], [], [], 0.1)
- while len(res[0]) > 0:
- self.last_activity = time.time()
-
- try:
- s = os.read(self.master_fd, 1)
- sys.stdout.write('%s' % s)
- except OSError:
- e = None
- return None
-
- res = select.select([
- self.master_fd], [], [], 0.1)
- sys.stdout.flush()
-
-
- def fork(self):
- logging.debug('doing a pty.fork()')
- os.environ['TERM'] = 'dumb'
- (self.pid, self.master_fd) = pty.fork()
- if self.pid != 0:
- logging.debug('pid is: %s' % self.pid)
-
- return self.pid
-
-
-
- class DistUpgradeViewNonInteractive(DistUpgradeView):
- ''' non-interactive version of the upgrade view '''
-
- def __init__(self, datadir = None, logdir = None):
- self.config = DistUpgradeConfig('.')
- self._fetchProgress = NonInteractiveFetchProgress()
- self._installProgress = NonInteractiveInstallProgress()
- self._opProgress = apt.progress.OpProgress()
- sys.__excepthook__ = self.excepthook
-
-
- def excepthook(self, type, value, traceback):
- ''' on uncaught exceptions -> print error and reboot '''
- logging.error("got exception '%s': %s " % (type, value))
- self.confirmRestart()
-
-
- def getOpCacheProgress(self):
- ''' return a OpProgress() subclass for the given graphic'''
- return self._opProgress
-
-
- def getFetchProgress(self):
- ''' return a fetch progress object '''
- return self._fetchProgress
-
-
- def getInstallProgress(self, cache = None):
- ''' return a install progress object '''
- return self._installProgress
-
-
- def updateStatus(self, msg):
- ''' update the current status of the distUpgrade based
- on the current view
- '''
- pass
-
-
- def setStep(self, step):
- ''' we have 5 steps current for a upgrade:
- 1. Analyzing the system
- 2. Updating repository information
- 3. Performing the upgrade
- 4. Post upgrade stuff
- 5. Complete
- '''
- pass
-
-
- def confirmChanges(self, summary, changes, downloadSize, actions = None, removal_bold = True):
- DistUpgradeView.confirmChanges(self, summary, changes, downloadSize, actions)
- logging.debug("toinstall: '%s'" % self.toInstall)
- logging.debug("toupgrade: '%s'" % self.toUpgrade)
- logging.debug("toremove: '%s'" % self.toRemove)
- return True
-
-
- def askYesNoQuestion(self, summary, msg):
- """ ask a Yes/No question and return True on 'Yes' """
- return True
-
-
- def confirmRestart(self):
- ''' generic ask about the restart, can be overridden '''
- logging.debug('confirmRestart() called')
-
- try:
- return self.config.getboolean('NonInteractive', 'RealReboot')
- except Exception:
- e = None
- logging.debug('no RealReboot found, returning false (%s) ' % e)
- return False
-
-
-
- def error(self, summary, msg, extended_msg = None):
- ''' display a error '''
- logging.error('%s %s (%s)' % (summary, msg, extended_msg))
-
-
- def abort(self):
- logging.error('view.abort called')
-
-
- if __name__ == '__main__':
- view = DistUpgradeViewNonInteractive()
- fp = NonInteractiveFetchProgress()
- ip = NonInteractiveInstallProgress()
- ip.error('xserver-xorg', 'pre-installation script failed')
- cache = apt.Cache()
- for pkg in sys.argv[1:]:
- cache[pkg].markInstall()
-
- cache.commit(fp, ip)
- time.sleep(2)
- sys.exit(0)
-
-